Trying to use ProtobufSerializer -- program consistently fails with the
following exception:

java.lang.IllegalStateException: Update task on instance
52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
at
org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
after [10000 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

I'm at my wits' end now, any suggestions are highly appreciated.

Thanks,
Timur


On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Hello,
>
> I'm running a Flink program that is failing with the following exception:
>
> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>                   - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
> at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
> at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
> at scala.Option.foreach(Option.scala:257)
> at
> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
> at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
> CoGroup (CoGroup at
> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
> Filter (Filter at
> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
> caused an error: Error obtaining the sorted input: Thread 'SortMerger
> Reading Thread' terminated due to an exception: No more bytes left.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: No more
> bytes left.
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at
> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: No more bytes left.
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.EOFException: No more bytes left.
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
> at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
> at com.esotericsoftware.kryo.io.Input.readString(Input.java:465)
> at
> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at
> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
> at
> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> The simplified version of the code looks more or less like following:
> ```
> case class Name(first: String, last: String)
> case class Phone(number: String)
> case class Address(addr: String, city: String, country: String)
> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
> ...
> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => String
> = ...
> ...
> val data = env.readCsvFile[MySchema](...).map(Record(_))
>
> val helper: DataSet[(Name, String)] = ...
>
> val result = data.filter(_.address.isDefined)
>   .coGroup(helper)
>   .where(e => LegacyDigest.buildMessageDigest((e.name,
> e.address.get.country)))
>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>   .apply {resolutionFunc}
>   .filter(_ != "")
>
> result.writeAsText(...)
> ```
>
> This code fails only when I run it on the full dataset, when I split the
> `data` on smaller chunks (`helper` always stays the same), I'm able to
> complete successfully. I guess with smaller memory requirements
> serialization/deserialization does not kick in.
>
> I'm trying now to explicitly set Protobuf serializer for Kryo:
> ```
> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
> classOf[ProtobufSerializer])
>
> ```
> but every run takes significant time before failing, so any other advice
> is appreciated.
>
> Thanks,
> Timur
>

Reply via email to