Hey Timur, I'm sorry about this bad experience.
>From what I can tell, there is nothing unusual with your code. It's probably an issue with Flink. I think we have to wait a little longer to hear what others in the community say about this. @Aljoscha, Till, Robert: any ideas what might cause this? – Ufuk On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov <[email protected]> wrote: > Still trying to resolve this serialization issue. I was able to hack it by > 'serializing' `Record` to String and then 'deserializing' it in coGroup, but > boy its so ugly. > > So the bug is that it can't deserialize the case class that has the > structure (slightly different and more detailed than I stated above): > ``` > case class Record(name: Name, phone: Option[Phone], address: > Option[Address]) > > case class Name(givenName: Option[String], middleName: Option[String], > familyName: Option[String], generationSuffix: Option[String] = None) > > trait Address{ > val city: String > val state: String > val country: String > val latitude: Double > val longitude: Double > val postalCode: String > val zip4: String > val digest: String > } > > > case class PoBox(city: String, > state: String, > country: String, > latitude: Double, > longitude: Double, > postalCode: String, > zip4: String, > digest: String, > poBox: String > ) extends Address > > case class PostalAddress(city: String, > state: String, > country: String, > latitude: Double, > longitude: Double, > postalCode: String, > zip4: String, > digest: String, > preDir: String, > streetName: String, > streetType: String, > postDir: String, > house: String, > aptType: String, > aptNumber: String > ) extends Address > ``` > > I would expect that serialization is one of Flink cornerstones and should be > well tested, so there is a high chance of me doing things wrongly, but I > can't really find anything unusual in my code. > > Any suggestion what to try is highly welcomed. > > Thanks, > Timur > > > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <[email protected]> > wrote: >> >> Hello Robert, >> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue >> with a cluster (that I didn't dig into), when I restarted the cluster I was >> able to go past it, so now I have the following exception: >> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup >> at >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) >> -> Filter (Filter at >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' >> , caused an error: Error obtaining the sorted input: Thread 'SortMerger >> Reading Thread' terminated due to an exception: Serializer consumed more >> bytes than the record had. This indicates broken serialization. If you are >> using custom serialization types (Value or Writable), check their >> serialization methods. If you are using a Kryo-serialized type, check the >> corresponding Kryo serializer. >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger Reading Thread' terminated due to an exception: >> Serializer consumed more bytes than the record had. This indicates broken >> serialization. If you are using custom serialization types (Value or >> Writable), check their serialization methods. If you are using a >> Kryo-serialized type, check the corresponding Kryo serializer. >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >> at >> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> terminated due to an exception: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >> Caused by: java.io.IOException: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >> at >> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259) >> at org.apache.flink.types.StringValue.readString(StringValue.java:771) >> at >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >> at >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) >> at >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> ... 5 more >> >> Thanks, >> Timur >> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <[email protected]> >> wrote: >>> >>> For the second exception, can you check the logs of the failing >>> taskmanager (10.105.200.137)? >>> I guess these logs some details on why the TM timed out. >>> >>> >>> Are you on 1.0.x or on 1.1-SNAPSHOT? >>> We recently changed something related to the ExecutionConfig which has >>> lead to Kryo issues in the past. >>> >>> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov >>> <[email protected]> wrote: >>>> >>>> Trying to use ProtobufSerializer -- program consistently fails with the >>>> following exception: >>>> >>>> java.lang.IllegalStateException: Update task on instance >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: >>>> akka.tcp://[email protected]:48990/user/taskmanager failed due to: >>>> at >>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) >>>> at akka.dispatch.OnFailure.internal(Future.scala:228) >>>> at akka.dispatch.OnFailure.internal(Future.scala:227) >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> at >>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) >>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) >>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>>> at >>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >>>> [Actor[akka.tcp://[email protected]:48990/user/taskmanager#1418296501]] >>>> after [10000 ms] >>>> at >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >>>> at >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) >>>> at >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >>>> at >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) >>>> at >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> I'm at my wits' end now, any suggestions are highly appreciated. >>>> >>>> Thanks, >>>> Timur >>>> >>>> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov >>>> <[email protected]> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I'm running a Flink program that is failing with the following >>>>> exception: >>>>> >>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend >>>>> - Error while running the command. >>>>> org.apache.flink.client.program.ProgramInvocationException: The program >>>>> execution failed: Job execution failed. >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>>>> at >>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>>>> at >>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>>> at >>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) >>>>> at >>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) >>>>> at >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>>>> at >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>>>> at scala.Option.foreach(Option.scala:257) >>>>> at >>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) >>>>> at >>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> at >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> at >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>>> execution failed. >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>>>> at >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>>>> at >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>>> at >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN >>>>> CoGroup (CoGroup at >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> >>>>> Filter (Filter at >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , >>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger >>>>> Reading Thread' terminated due to an exception: No more bytes left. >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>> No >>>>> more bytes left. >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>> at >>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>> ... 3 more >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>> terminated due to an exception: No more bytes left. >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>>>> Caused by: java.io.EOFException: No more bytes left. >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542) >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) >>>>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>> at >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67) >>>>> at >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) >>>>> at >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) >>>>> at >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) >>>>> at >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>> at >>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>> at >>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>> >>>>> The simplified version of the code looks more or less like following: >>>>> ``` >>>>> case class Name(first: String, last: String) >>>>> case class Phone(number: String) >>>>> case class Address(addr: String, city: String, country: String) >>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address]) >>>>> ... >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => >>>>> String = ... >>>>> ... >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_)) >>>>> >>>>> val helper: DataSet[(Name, String)] = ... >>>>> >>>>> val result = data.filter(_.address.isDefined) >>>>> .coGroup(helper) >>>>> .where(e => LegacyDigest.buildMessageDigest((e.name, >>>>> e.address.get.country))) >>>>> .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2))) >>>>> .apply {resolutionFunc} >>>>> .filter(_ != "") >>>>> >>>>> result.writeAsText(...) >>>>> ``` >>>>> >>>>> This code fails only when I run it on the full dataset, when I split >>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able to >>>>> complete successfully. I guess with smaller memory requirements >>>>> serialization/deserialization does not kick in. >>>>> >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo: >>>>> ``` >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record], >>>>> classOf[ProtobufSerializer]) >>>>> >>>>> ``` >>>>> but every run takes significant time before failing, so any other >>>>> advice is appreciated. >>>>> >>>>> Thanks, >>>>> Timur >>>> >>>> >>> >> >
