Hello Robert, I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue with a cluster (that I didn't dig into), when I restarted the cluster I was able to go past it, so now I have the following exception:
java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259) at org.apache.flink.types.StringValue.readString(StringValue.java:771) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) ... 5 more Thanks, Timur On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetz...@apache.org> wrote: > For the second exception, can you check the logs of the failing > taskmanager (10.105.200.137)? > I guess these logs some details on why the TM timed out. > > > Are you on 1.0.x or on 1.1-SNAPSHOT? > We recently changed something related to the ExecutionConfig which has > lead to Kryo issues in the past. > > > On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov <timur.fairu...@gmail.com > > wrote: > >> Trying to use ProtobufSerializer -- program consistently fails with the >> following exception: >> >> java.lang.IllegalStateException: Update task on instance >> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: >> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to: >> at >> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) >> at akka.dispatch.OnFailure.internal(Future.scala:228) >> at akka.dispatch.OnFailure.internal(Future.scala:227) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> at >> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) >> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) >> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >> at >> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]] >> after [10000 ms] >> at >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >> at >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) >> at >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >> at >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) >> at >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) >> at >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) >> at >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) >> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >> at java.lang.Thread.run(Thread.java:745) >> >> I'm at my wits' end now, any suggestions are highly appreciated. >> >> Thanks, >> Timur >> >> >> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <timur.fairu...@gmail.com >> > wrote: >> >>> Hello, >>> >>> I'm running a Flink program that is failing with the following exception: >>> >>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend >>> - Error while running the command. >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> execution failed: Job execution failed. >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>> at >>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>> at >>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) >>> at >>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) >>> at >>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>> at >>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>> at scala.Option.foreach(Option.scala:257) >>> at >>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) >>> at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>> at >>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN >>> CoGroup (CoGroup at >>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> >>> Filter (Filter at >>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , >>> caused an error: Error obtaining the sorted input: Thread 'SortMerger >>> Reading Thread' terminated due to an exception: No more bytes left. >>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) >>> at >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >>> Thread 'SortMerger Reading Thread' terminated due to an exception: No more >>> bytes left. >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>> at >>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>> at >>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) >>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>> ... 3 more >>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>> terminated due to an exception: No more bytes left. >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>> Caused by: java.io.EOFException: No more bytes left. >>> at >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) >>> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542) >>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) >>> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465) >>> at >>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>> at >>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67) >>> at >>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) >>> at >>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> at >>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>> at >>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) >>> at >>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>> at >>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>> at >>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>> at >>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>> >>> The simplified version of the code looks more or less like following: >>> ``` >>> case class Name(first: String, last: String) >>> case class Phone(number: String) >>> case class Address(addr: String, city: String, country: String) >>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address]) >>> ... >>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => >>> String = ... >>> ... >>> val data = env.readCsvFile[MySchema](...).map(Record(_)) >>> >>> val helper: DataSet[(Name, String)] = ... >>> >>> val result = data.filter(_.address.isDefined) >>> .coGroup(helper) >>> .where(e => LegacyDigest.buildMessageDigest((e.name, >>> e.address.get.country))) >>> .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2))) >>> .apply {resolutionFunc} >>> .filter(_ != "") >>> >>> result.writeAsText(...) >>> ``` >>> >>> This code fails only when I run it on the full dataset, when I split the >>> `data` on smaller chunks (`helper` always stays the same), I'm able to >>> complete successfully. I guess with smaller memory requirements >>> serialization/deserialization does not kick in. >>> >>> I'm trying now to explicitly set Protobuf serializer for Kryo: >>> ``` >>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record], >>> classOf[ProtobufSerializer]) >>> >>> ``` >>> but every run takes significant time before failing, so any other advice >>> is appreciated. >>> >>> Thanks, >>> Timur >>> >> >> >