Hi Timur, could you try to exclude the older kryo dependency from twitter.carbonite via
<dependency> <groupId>com.twitter</groupId> <artifactId>carbonite</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <artifactId>kryo</artifactId> <groupId>com.esotericsoftware.kryo</groupId> </exclusion> </exclusions> </dependency> and try whether this solves your problem. If your problem should still persist, could you share your pom file with us. Cheers, Till On Wed, Apr 27, 2016 at 8:34 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Hi Ken, > > Good point actually, thanks for pointing this out. In Flink project I see > that there is dependency on 2.24 and then I see transitive dependencies > through twitter.carbonite has a dependency on 2.21. Also, twitter.chill > that is used to manipulate Kryo as far as I understand, shows up with > versions 0.7.4 (Flink) and 0.3.5 (carbonite again). I wonder if this could > cause issues since if classes are not deduplicated class loading order > could be different on different machines. The maven project setup is fairly > complicated and I'm not a maven expert, so I would appreciate a second look > on that. > > Thanks, > Timur > > > On Tue, Apr 26, 2016 at 6:51 PM, Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> I don’t know if this is helpful, but I’d run into a similar issue (array >> index out of bounds during Kryo deserialization) due to having a different >> version of Kryo on the classpath. >> >> — Ken >> >> On Apr 26, 2016, at 6:23pm, Timur Fayruzov <timur.fairu...@gmail.com> >> wrote: >> >> I built master with scala 2.11 and hadoop 2.7.1, now get a different >> exception (still serialization-related though): >> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup >> (CoGroup at >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) >> -> Filter (Filter at >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' >> , caused an error: Error obtaining the sorted input: Thread 'SortMerger >> Reading Thread' terminated due to an exception: Index: 97, Size: 11 >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger Reading Thread' terminated due to an exception: Index: >> 97, Size: 11 >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >> at >> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> terminated due to an exception: Index: 97, Size: 11 >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11 >> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >> at java.util.ArrayList.get(ArrayList.java:429) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) >> at >> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75) >> at >> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) >> at >> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >> >> >> On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Then let's keep finger crossed that we've found the culprit :-) >>> >>> On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov < >>> timur.fairu...@gmail.com> wrote: >>> >>>> Thank you Till. >>>> >>>> I will try to run with new binaries today. As I have mentioned, the >>>> error is reproducible only on a full dataset, so coming up with sample >>>> input data may be problematic (not to mention that the real data can't be >>>> shared). I'll see if I can replicate it, but could take a bit longer. Thank >>>> you very much for your effort. >>>> >>>> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Timur, >>>>> >>>>> I’ve got good and not so good news. Let’s start with the not so good >>>>> news. I couldn’t reproduce your problem but the good news is that I found >>>>> a >>>>> bug in the duplication logic of the OptionSerializer. I’ve already >>>>> committed a patch to the master to fix it. >>>>> >>>>> Thus, I wanted to ask you, whether you could try out the latest master >>>>> and check whether your problem still persists. If that’s the case, could >>>>> you send me your complete code with sample input data which reproduces >>>>> your >>>>> problem? >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> >>>>> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek < >>>>> aljos...@apache.org> wrote: >>>>> >>>>>> Could this be caused by the disabled reference tracking in our Kryo >>>>>> serializer? From the stack trace it looks like its failing when trying to >>>>>> deserialize the traits that are wrapped in Options. >>>>>> >>>>>> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <u...@apache.org> wrote: >>>>>> >>>>>>> Hey Timur, >>>>>>> >>>>>>> I'm sorry about this bad experience. >>>>>>> >>>>>>> From what I can tell, there is nothing unusual with your code. It's >>>>>>> probably an issue with Flink. >>>>>>> >>>>>>> I think we have to wait a little longer to hear what others in the >>>>>>> community say about this. >>>>>>> >>>>>>> @Aljoscha, Till, Robert: any ideas what might cause this? >>>>>>> >>>>>>> – Ufuk >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov >>>>>>> <timur.fairu...@gmail.com> wrote: >>>>>>> > Still trying to resolve this serialization issue. I was able to >>>>>>> hack it by >>>>>>> > 'serializing' `Record` to String and then 'deserializing' it in >>>>>>> coGroup, but >>>>>>> > boy its so ugly. >>>>>>> > >>>>>>> > So the bug is that it can't deserialize the case class that has the >>>>>>> > structure (slightly different and more detailed than I stated >>>>>>> above): >>>>>>> > ``` >>>>>>> > case class Record(name: Name, phone: Option[Phone], address: >>>>>>> > Option[Address]) >>>>>>> > >>>>>>> > case class Name(givenName: Option[String], middleName: >>>>>>> Option[String], >>>>>>> > familyName: Option[String], generationSuffix: Option[String] = >>>>>>> None) >>>>>>> > >>>>>>> > trait Address{ >>>>>>> > val city: String >>>>>>> > val state: String >>>>>>> > val country: String >>>>>>> > val latitude: Double >>>>>>> > val longitude: Double >>>>>>> > val postalCode: String >>>>>>> > val zip4: String >>>>>>> > val digest: String >>>>>>> > } >>>>>>> > >>>>>>> > >>>>>>> > case class PoBox(city: String, >>>>>>> > state: String, >>>>>>> > country: String, >>>>>>> > latitude: Double, >>>>>>> > longitude: Double, >>>>>>> > postalCode: String, >>>>>>> > zip4: String, >>>>>>> > digest: String, >>>>>>> > poBox: String >>>>>>> > ) extends Address >>>>>>> > >>>>>>> > case class PostalAddress(city: String, >>>>>>> > state: String, >>>>>>> > country: String, >>>>>>> > latitude: Double, >>>>>>> > longitude: Double, >>>>>>> > postalCode: String, >>>>>>> > zip4: String, >>>>>>> > digest: String, >>>>>>> > preDir: String, >>>>>>> > streetName: String, >>>>>>> > streetType: String, >>>>>>> > postDir: String, >>>>>>> > house: String, >>>>>>> > aptType: String, >>>>>>> > aptNumber: String >>>>>>> > ) extends Address >>>>>>> > ``` >>>>>>> > >>>>>>> > I would expect that serialization is one of Flink cornerstones and >>>>>>> should be >>>>>>> > well tested, so there is a high chance of me doing things wrongly, >>>>>>> but I >>>>>>> > can't really find anything unusual in my code. >>>>>>> > >>>>>>> > Any suggestion what to try is highly welcomed. >>>>>>> > >>>>>>> > Thanks, >>>>>>> > Timur >>>>>>> > >>>>>>> > >>>>>>> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov < >>>>>>> timur.fairu...@gmail.com> >>>>>>> > wrote: >>>>>>> >> >>>>>>> >> Hello Robert, >>>>>>> >> >>>>>>> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was >>>>>>> an issue >>>>>>> >> with a cluster (that I didn't dig into), when I restarted the >>>>>>> cluster I was >>>>>>> >> able to go past it, so now I have the following exception: >>>>>>> >> >>>>>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup >>>>>>> (CoGroup >>>>>>> >> at >>>>>>> >> >>>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) >>>>>>> >> -> Filter (Filter at >>>>>>> >> >>>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' >>>>>>> >> , caused an error: Error obtaining the sorted input: Thread >>>>>>> 'SortMerger >>>>>>> >> Reading Thread' terminated due to an exception: Serializer >>>>>>> consumed more >>>>>>> >> bytes than the record had. This indicates broken serialization. >>>>>>> If you are >>>>>>> >> using custom serialization types (Value or Writable), check their >>>>>>> >> serialization methods. If you are using a Kryo-serialized type, >>>>>>> check the >>>>>>> >> corresponding Kryo serializer. >>>>>>> >> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>>>>>> >> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> >> at java.lang.Thread.run(Thread.java:745) >>>>>>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>> input: >>>>>>> >> Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>>>> >> Serializer consumed more bytes than the record had. This >>>>>>> indicates broken >>>>>>> >> serialization. If you are using custom serialization types (Value >>>>>>> or >>>>>>> >> Writable), check their serialization methods. If you are using a >>>>>>> >> Kryo-serialized type, check the corresponding Kryo serializer. >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>>>>>> >> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>>>> >> ... 3 more >>>>>>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>> >> terminated due to an exception: Serializer consumed more bytes >>>>>>> than the >>>>>>> >> record had. This indicates broken serialization. If you are using >>>>>>> custom >>>>>>> >> serialization types (Value or Writable), check their >>>>>>> serialization methods. >>>>>>> >> If you are using a Kryo-serialized type, check the corresponding >>>>>>> Kryo >>>>>>> >> serializer. >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>> >> Caused by: java.io.IOException: Serializer consumed more bytes >>>>>>> than the >>>>>>> >> record had. This indicates broken serialization. If you are using >>>>>>> custom >>>>>>> >> serialization types (Value or Writable), check their >>>>>>> serialization methods. >>>>>>> >> If you are using a Kryo-serialized type, check the corresponding >>>>>>> Kryo >>>>>>> >> serializer. >>>>>>> >> at >>>>>>> >> org.apache.flink.runtime.io >>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>>>>>> >> at >>>>>>> >> org.apache.flink.runtime.io >>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>> >> at >>>>>>> >> org.apache.flink.runtime.io >>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) >>>>>>> >> at >>>>>>> >> org.apache.flink.runtime.io >>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254) >>>>>>> >> at >>>>>>> >> org.apache.flink.runtime.io >>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259) >>>>>>> >> at >>>>>>> org.apache.flink.types.StringValue.readString(StringValue.java:771) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>> >> at >>>>>>> >> >>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>>>>> >> at >>>>>>> >> org.apache.flink.runtime.io >>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>>> >> ... 5 more >>>>>>> >> >>>>>>> >> Thanks, >>>>>>> >> Timur >>>>>>> >> >>>>>>> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger < >>>>>>> rmetz...@apache.org> >>>>>>> >> wrote: >>>>>>> >>> >>>>>>> >>> For the second exception, can you check the logs of the failing >>>>>>> >>> taskmanager (10.105.200.137)? >>>>>>> >>> I guess these logs some details on why the TM timed out. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> Are you on 1.0.x or on 1.1-SNAPSHOT? >>>>>>> >>> We recently changed something related to the ExecutionConfig >>>>>>> which has >>>>>>> >>> lead to Kryo issues in the past. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov >>>>>>> >>> <timur.fairu...@gmail.com> wrote: >>>>>>> >>>> >>>>>>> >>>> Trying to use ProtobufSerializer -- program consistently fails >>>>>>> with the >>>>>>> >>>> following exception: >>>>>>> >>>> >>>>>>> >>>> java.lang.IllegalStateException: Update task on instance >>>>>>> >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots >>>>>>> - URL: >>>>>>> >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed >>>>>>> due to: >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) >>>>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:228) >>>>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:227) >>>>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) >>>>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) >>>>>>> >>>> at >>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) >>>>>>> >>>> at >>>>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) >>>>>>> >>>> at >>>>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) >>>>>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) >>>>>>> >>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>>>>>> >>>> [Actor[akka.tcp:// >>>>>>> flink@10.105.200.137:48990/user/taskmanager#1418296501]] >>>>>>> >>>> after [10000 ms] >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) >>>>>>> >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>> >>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>> >>>>>>> >>>> I'm at my wits' end now, any suggestions are highly appreciated. >>>>>>> >>>> >>>>>>> >>>> Thanks, >>>>>>> >>>> Timur >>>>>>> >>>> >>>>>>> >>>> >>>>>>> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov >>>>>>> >>>> <timur.fairu...@gmail.com> wrote: >>>>>>> >>>>> >>>>>>> >>>>> Hello, >>>>>>> >>>>> >>>>>>> >>>>> I'm running a Flink program that is failing with the following >>>>>>> >>>>> exception: >>>>>>> >>>>> >>>>>>> >>>>> 2016-04-23 02:00:38,947 ERROR >>>>>>> org.apache.flink.client.CliFrontend >>>>>>> >>>>> - Error while running the command. >>>>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>>>> The program >>>>>>> >>>>> execution failed: Job execution failed. >>>>>>> >>>>> at >>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>>>>>> >>>>> at >>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>>>>>> >>>>> at >>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>>>>>> >>>>> at scala.Option.foreach(Option.scala:257) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>>>>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>> >>>>> at >>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>> >>>>> at >>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>> >>>>> at >>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>> >>>>> Caused by: >>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job >>>>>>> >>>>> execution failed. >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>>>>>> >>>>> at >>>>>>> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>>>>>> >>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> >>>>> Caused by: java.lang.Exception: The data preparation for task >>>>>>> 'CHAIN >>>>>>> >>>>> CoGroup (CoGroup at >>>>>>> >>>>> >>>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> >>>>>>> >>>>> Filter (Filter at >>>>>>> >>>>> >>>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , >>>>>>> >>>>> caused an error: Error obtaining the sorted input: Thread >>>>>>> 'SortMerger >>>>>>> >>>>> Reading Thread' terminated due to an exception: No more bytes >>>>>>> left. >>>>>>> >>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>> Caused by: java.lang.RuntimeException: Error obtaining the >>>>>>> sorted >>>>>>> >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>> exception: No >>>>>>> >>>>> more bytes left. >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>>>>>> >>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>>>> >>>>> ... 3 more >>>>>>> >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading >>>>>>> Thread' >>>>>>> >>>>> terminated due to an exception: No more bytes left. >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>> >>>>> Caused by: java.io.EOFException: No more bytes left. >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) >>>>>>> >>>>> at com.esotericsoftware.kryo.io >>>>>>> .Input.readUtf8_slow(Input.java:542) >>>>>>> >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) >>>>>>> >>>>> at com.esotericsoftware.kryo.io >>>>>>> .Input.readString(Input.java:465) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>>>> >>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>>>>> >>>>> at >>>>>>> >>>>> org.apache.flink.runtime.io >>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) >>>>>>> >>>>> at >>>>>>> >>>>> org.apache.flink.runtime.io >>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>> >>>>> at >>>>>>> >>>>> org.apache.flink.runtime.io >>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>> >>>>> at >>>>>>> >>>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>> >>>>> >>>>>>> >>>>> The simplified version of the code looks more or less like >>>>>>> following: >>>>>>> >>>>> ``` >>>>>>> >>>>> case class Name(first: String, last: String) >>>>>>> >>>>> case class Phone(number: String) >>>>>>> >>>>> case class Address(addr: String, city: String, country: String) >>>>>>> >>>>> case class Record(n: Name, phone: Option[Phone], addr: >>>>>>> Option[Address]) >>>>>>> >>>>> ... >>>>>>> >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, >>>>>>> String)] => >>>>>>> >>>>> String = ... >>>>>>> >>>>> ... >>>>>>> >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_)) >>>>>>> >>>>> >>>>>>> >>>>> val helper: DataSet[(Name, String)] = ... >>>>>>> >>>>> >>>>>>> >>>>> val result = data.filter(_.address.isDefined) >>>>>>> >>>>> .coGroup(helper) >>>>>>> >>>>> .where(e => LegacyDigest.buildMessageDigest((e.name, >>>>>>> >>>>> e.address.get.country))) >>>>>>> >>>>> .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2))) >>>>>>> >>>>> .apply {resolutionFunc} >>>>>>> >>>>> .filter(_ != "") >>>>>>> >>>>> >>>>>>> >>>>> result.writeAsText(...) >>>>>>> >>>>> ``` >>>>>>> >>>>> >>>>>>> >>>>> This code fails only when I run it on the full dataset, when I >>>>>>> split >>>>>>> >>>>> the `data` on smaller chunks (`helper` always stays the same), >>>>>>> I'm able to >>>>>>> >>>>> complete successfully. I guess with smaller memory requirements >>>>>>> >>>>> serialization/deserialization does not kick in. >>>>>>> >>>>> >>>>>>> >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo: >>>>>>> >>>>> ``` >>>>>>> >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record], >>>>>>> >>>>> classOf[ProtobufSerializer]) >>>>>>> >>>>> >>>>>>> >>>>> ``` >>>>>>> >>>>> but every run takes significant time before failing, so any >>>>>>> other >>>>>>> >>>>> advice is appreciated. >>>>>>> >>>>> >>>>>>> >>>>> Thanks, >>>>>>> >>>>> Timur >>>>>>> >>>> >>>>>>> >>>> >>>>>>> >>> >>>>>>> >> >>>>>>> > >>>>>>> >>>>>> >>>>> >>>> >>> >> >> -------------------------- >> Ken Krugler >> +1 530-210-6378 >> http://www.scaleunlimited.com >> custom big data solutions & training >> Hadoop, Cascading, Cassandra & Solr >> >> >> >> >