Hello, I'm running a Flink program that is failing with the following exception:
2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) at scala.Option.foreach(Option.scala:257) at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> Filter (Filter at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542) at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) at com.esotericsoftware.kryo.io.Input.readString(Input.java:465) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67) at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) The simplified version of the code looks more or less like following: ``` case class Name(first: String, last: String) case class Phone(number: String) case class Address(addr: String, city: String, country: String) case class Record(n: Name, phone: Option[Phone], addr: Option[Address]) ... def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => String = ... ... val data = env.readCsvFile[MySchema](...).map(Record(_)) val helper: DataSet[(Name, String)] = ... val result = data.filter(_.address.isDefined) .coGroup(helper) .where(e => LegacyDigest.buildMessageDigest((e.name, e.address.get.country))) .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2))) .apply {resolutionFunc} .filter(_ != "") result.writeAsText(...) ``` This code fails only when I run it on the full dataset, when I split the `data` on smaller chunks (`helper` always stays the same), I'm able to complete successfully. I guess with smaller memory requirements serialization/deserialization does not kick in. I'm trying now to explicitly set Protobuf serializer for Kryo: ``` env.getConfig.registerTypeWithKryoSerializer(classOf[Record], classOf[ProtobufSerializer]) ``` but every run takes significant time before failing, so any other advice is appreciated. Thanks, Timur