I built master with scala 2.11 and hadoop 2.7.1, now get a different exception (still serialization-related though):
java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75) at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Then let's keep finger crossed that we've found the culprit :-) > > On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > >> Thank you Till. >> >> I will try to run with new binaries today. As I have mentioned, the error >> is reproducible only on a full dataset, so coming up with sample input data >> may be problematic (not to mention that the real data can't be shared). >> I'll see if I can replicate it, but could take a bit longer. Thank you very >> much for your effort. >> >> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Timur, >>> >>> I’ve got good and not so good news. Let’s start with the not so good >>> news. I couldn’t reproduce your problem but the good news is that I found a >>> bug in the duplication logic of the OptionSerializer. I’ve already >>> committed a patch to the master to fix it. >>> >>> Thus, I wanted to ask you, whether you could try out the latest master >>> and check whether your problem still persists. If that’s the case, could >>> you send me your complete code with sample input data which reproduces your >>> problem? >>> >>> Cheers, >>> Till >>> >>> >>> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Could this be caused by the disabled reference tracking in our Kryo >>>> serializer? From the stack trace it looks like its failing when trying to >>>> deserialize the traits that are wrapped in Options. >>>> >>>> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <u...@apache.org> wrote: >>>> >>>>> Hey Timur, >>>>> >>>>> I'm sorry about this bad experience. >>>>> >>>>> From what I can tell, there is nothing unusual with your code. It's >>>>> probably an issue with Flink. >>>>> >>>>> I think we have to wait a little longer to hear what others in the >>>>> community say about this. >>>>> >>>>> @Aljoscha, Till, Robert: any ideas what might cause this? >>>>> >>>>> – Ufuk >>>>> >>>>> >>>>> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov >>>>> <timur.fairu...@gmail.com> wrote: >>>>> > Still trying to resolve this serialization issue. I was able to hack >>>>> it by >>>>> > 'serializing' `Record` to String and then 'deserializing' it in >>>>> coGroup, but >>>>> > boy its so ugly. >>>>> > >>>>> > So the bug is that it can't deserialize the case class that has the >>>>> > structure (slightly different and more detailed than I stated above): >>>>> > ``` >>>>> > case class Record(name: Name, phone: Option[Phone], address: >>>>> > Option[Address]) >>>>> > >>>>> > case class Name(givenName: Option[String], middleName: >>>>> Option[String], >>>>> > familyName: Option[String], generationSuffix: Option[String] = None) >>>>> > >>>>> > trait Address{ >>>>> > val city: String >>>>> > val state: String >>>>> > val country: String >>>>> > val latitude: Double >>>>> > val longitude: Double >>>>> > val postalCode: String >>>>> > val zip4: String >>>>> > val digest: String >>>>> > } >>>>> > >>>>> > >>>>> > case class PoBox(city: String, >>>>> > state: String, >>>>> > country: String, >>>>> > latitude: Double, >>>>> > longitude: Double, >>>>> > postalCode: String, >>>>> > zip4: String, >>>>> > digest: String, >>>>> > poBox: String >>>>> > ) extends Address >>>>> > >>>>> > case class PostalAddress(city: String, >>>>> > state: String, >>>>> > country: String, >>>>> > latitude: Double, >>>>> > longitude: Double, >>>>> > postalCode: String, >>>>> > zip4: String, >>>>> > digest: String, >>>>> > preDir: String, >>>>> > streetName: String, >>>>> > streetType: String, >>>>> > postDir: String, >>>>> > house: String, >>>>> > aptType: String, >>>>> > aptNumber: String >>>>> > ) extends Address >>>>> > ``` >>>>> > >>>>> > I would expect that serialization is one of Flink cornerstones and >>>>> should be >>>>> > well tested, so there is a high chance of me doing things wrongly, >>>>> but I >>>>> > can't really find anything unusual in my code. >>>>> > >>>>> > Any suggestion what to try is highly welcomed. >>>>> > >>>>> > Thanks, >>>>> > Timur >>>>> > >>>>> > >>>>> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov < >>>>> timur.fairu...@gmail.com> >>>>> > wrote: >>>>> >> >>>>> >> Hello Robert, >>>>> >> >>>>> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an >>>>> issue >>>>> >> with a cluster (that I didn't dig into), when I restarted the >>>>> cluster I was >>>>> >> able to go past it, so now I have the following exception: >>>>> >> >>>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup >>>>> (CoGroup >>>>> >> at >>>>> >> >>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) >>>>> >> -> Filter (Filter at >>>>> >> >>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' >>>>> >> , caused an error: Error obtaining the sorted input: Thread >>>>> 'SortMerger >>>>> >> Reading Thread' terminated due to an exception: Serializer consumed >>>>> more >>>>> >> bytes than the record had. This indicates broken serialization. If >>>>> you are >>>>> >> using custom serialization types (Value or Writable), check their >>>>> >> serialization methods. If you are using a Kryo-serialized type, >>>>> check the >>>>> >> corresponding Kryo serializer. >>>>> >> at >>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>>>> >> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> >> at java.lang.Thread.run(Thread.java:745) >>>>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>> input: >>>>> >> Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>> >> Serializer consumed more bytes than the record had. This indicates >>>>> broken >>>>> >> serialization. If you are using custom serialization types (Value or >>>>> >> Writable), check their serialization methods. If you are using a >>>>> >> Kryo-serialized type, check the corresponding Kryo serializer. >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>>>> >> at >>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>> >> ... 3 more >>>>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>> >> terminated due to an exception: Serializer consumed more bytes than >>>>> the >>>>> >> record had. This indicates broken serialization. If you are using >>>>> custom >>>>> >> serialization types (Value or Writable), check their serialization >>>>> methods. >>>>> >> If you are using a Kryo-serialized type, check the corresponding >>>>> Kryo >>>>> >> serializer. >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>>>> >> Caused by: java.io.IOException: Serializer consumed more bytes than >>>>> the >>>>> >> record had. This indicates broken serialization. If you are using >>>>> custom >>>>> >> serialization types (Value or Writable), check their serialization >>>>> methods. >>>>> >> If you are using a Kryo-serialized type, check the corresponding >>>>> Kryo >>>>> >> serializer. >>>>> >> at >>>>> >> org.apache.flink.runtime.io >>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>>>> >> at >>>>> >> org.apache.flink.runtime.io >>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>> >> at >>>>> >> org.apache.flink.runtime.io >>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>> >> at >>>>> >> >>>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) >>>>> >> at >>>>> >> org.apache.flink.runtime.io >>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254) >>>>> >> at >>>>> >> org.apache.flink.runtime.io >>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259) >>>>> >> at >>>>> org.apache.flink.types.StringValue.readString(StringValue.java:771) >>>>> >> at >>>>> >> >>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>>>> >> at >>>>> >> >>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) >>>>> >> at >>>>> >> >>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>>>> >> at >>>>> >> >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>>>> >> at >>>>> >> >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>> >> at >>>>> >> >>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>>> >> at >>>>> >> org.apache.flink.runtime.io >>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>> >> ... 5 more >>>>> >> >>>>> >> Thanks, >>>>> >> Timur >>>>> >> >>>>> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger < >>>>> rmetz...@apache.org> >>>>> >> wrote: >>>>> >>> >>>>> >>> For the second exception, can you check the logs of the failing >>>>> >>> taskmanager (10.105.200.137)? >>>>> >>> I guess these logs some details on why the TM timed out. >>>>> >>> >>>>> >>> >>>>> >>> Are you on 1.0.x or on 1.1-SNAPSHOT? >>>>> >>> We recently changed something related to the ExecutionConfig which >>>>> has >>>>> >>> lead to Kryo issues in the past. >>>>> >>> >>>>> >>> >>>>> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov >>>>> >>> <timur.fairu...@gmail.com> wrote: >>>>> >>>> >>>>> >>>> Trying to use ProtobufSerializer -- program consistently fails >>>>> with the >>>>> >>>> following exception: >>>>> >>>> >>>>> >>>> java.lang.IllegalStateException: Update task on instance >>>>> >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - >>>>> URL: >>>>> >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed >>>>> due to: >>>>> >>>> at >>>>> >>>> >>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) >>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:228) >>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:227) >>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) >>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) >>>>> >>>> at >>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>> >>>> at >>>>> >>>> >>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) >>>>> >>>> at >>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) >>>>> >>>> at >>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) >>>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>>>> >>>> at >>>>> >>>> >>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) >>>>> >>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> >>>> at >>>>> >>>> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> >>>> at >>>>> >>>> >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> >>>> at >>>>> >>>> >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>>>> >>>> [Actor[akka.tcp:// >>>>> flink@10.105.200.137:48990/user/taskmanager#1418296501]] >>>>> >>>> after [10000 ms] >>>>> >>>> at >>>>> >>>> >>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) >>>>> >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >>>>> >>>> at >>>>> >>>> >>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) >>>>> >>>> at >>>>> >>>> >>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >>>>> >>>> at >>>>> >>>> >>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) >>>>> >>>> at >>>>> >>>> >>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) >>>>> >>>> at >>>>> >>>> >>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) >>>>> >>>> at >>>>> >>>> >>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) >>>>> >>>> at >>>>> >>>> >>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>> >>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>> >>>>> >>>> I'm at my wits' end now, any suggestions are highly appreciated. >>>>> >>>> >>>>> >>>> Thanks, >>>>> >>>> Timur >>>>> >>>> >>>>> >>>> >>>>> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov >>>>> >>>> <timur.fairu...@gmail.com> wrote: >>>>> >>>>> >>>>> >>>>> Hello, >>>>> >>>>> >>>>> >>>>> I'm running a Flink program that is failing with the following >>>>> >>>>> exception: >>>>> >>>>> >>>>> >>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend >>>>> >>>>> - Error while running the command. >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>> program >>>>> >>>>> execution failed: Job execution failed. >>>>> >>>>> at >>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>>>> >>>>> at >>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>>>> >>>>> at >>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) >>>>> >>>>> at >>>>> >>>>> >>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) >>>>> >>>>> at >>>>> >>>>> >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>>>> >>>>> at >>>>> >>>>> >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>>>> >>>>> at scala.Option.foreach(Option.scala:257) >>>>> >>>>> at >>>>> >>>>> >>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) >>>>> >>>>> at >>>>> >>>>> >>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> >>>>> at >>>>> >>>>> >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>> >>>>> at >>>>> >>>>> >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> >>>>> at >>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> >>>>> at >>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> >>>>> Caused by: >>>>> org.apache.flink.runtime.client.JobExecutionException: Job >>>>> >>>>> execution failed. >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>>> >>>>> at >>>>> >>>>> >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>>>> >>>>> at >>>>> >>>>> >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>>>> >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>>> >>>>> at >>>>> >>>>> >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>>>> >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> >>>>> at >>>>> >>>>> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >>>>> >>>>> at >>>>> >>>>> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >>>>> >>>>> at >>>>> >>>>> >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> >>>>> at >>>>> >>>>> >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >>>>> Caused by: java.lang.Exception: The data preparation for task >>>>> 'CHAIN >>>>> >>>>> CoGroup (CoGroup at >>>>> >>>>> >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> >>>>> >>>>> Filter (Filter at >>>>> >>>>> >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , >>>>> >>>>> caused an error: Error obtaining the sorted input: Thread >>>>> 'SortMerger >>>>> >>>>> Reading Thread' terminated due to an exception: No more bytes >>>>> left. >>>>> >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>> >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>> exception: No >>>>> >>>>> more bytes left. >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>>>> >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>> >>>>> ... 3 more >>>>> >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading >>>>> Thread' >>>>> >>>>> terminated due to an exception: No more bytes left. >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>>>> >>>>> Caused by: java.io.EOFException: No more bytes left. >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) >>>>> >>>>> at com.esotericsoftware.kryo.io >>>>> .Input.readUtf8_slow(Input.java:542) >>>>> >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) >>>>> >>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465) >>>>> >>>>> at >>>>> >>>>> >>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198) >>>>> >>>>> at >>>>> >>>>> >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>> >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>>> >>>>> at >>>>> >>>>> org.apache.flink.runtime.io >>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) >>>>> >>>>> at >>>>> >>>>> org.apache.flink.runtime.io >>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>> >>>>> at >>>>> >>>>> org.apache.flink.runtime.io >>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>>> >>>>> at >>>>> >>>>> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>> >>>>> >>>>> >>>>> The simplified version of the code looks more or less like >>>>> following: >>>>> >>>>> ``` >>>>> >>>>> case class Name(first: String, last: String) >>>>> >>>>> case class Phone(number: String) >>>>> >>>>> case class Address(addr: String, city: String, country: String) >>>>> >>>>> case class Record(n: Name, phone: Option[Phone], addr: >>>>> Option[Address]) >>>>> >>>>> ... >>>>> >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] >>>>> => >>>>> >>>>> String = ... >>>>> >>>>> ... >>>>> >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_)) >>>>> >>>>> >>>>> >>>>> val helper: DataSet[(Name, String)] = ... >>>>> >>>>> >>>>> >>>>> val result = data.filter(_.address.isDefined) >>>>> >>>>> .coGroup(helper) >>>>> >>>>> .where(e => LegacyDigest.buildMessageDigest((e.name, >>>>> >>>>> e.address.get.country))) >>>>> >>>>> .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2))) >>>>> >>>>> .apply {resolutionFunc} >>>>> >>>>> .filter(_ != "") >>>>> >>>>> >>>>> >>>>> result.writeAsText(...) >>>>> >>>>> ``` >>>>> >>>>> >>>>> >>>>> This code fails only when I run it on the full dataset, when I >>>>> split >>>>> >>>>> the `data` on smaller chunks (`helper` always stays the same), >>>>> I'm able to >>>>> >>>>> complete successfully. I guess with smaller memory requirements >>>>> >>>>> serialization/deserialization does not kick in. >>>>> >>>>> >>>>> >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo: >>>>> >>>>> ``` >>>>> >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record], >>>>> >>>>> classOf[ProtobufSerializer]) >>>>> >>>>> >>>>> >>>>> ``` >>>>> >>>>> but every run takes significant time before failing, so any other >>>>> >>>>> advice is appreciated. >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Timur >>>>> >>>> >>>>> >>>> >>>>> >>> >>>>> >> >>>>> > >>>>> >>>> >>> >> >