Hi Timur,

could you try to exclude the older kryo dependency from twitter.carbonite
via

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>carbonite</artifactId>
    <version>1.4.0</version>
    <exclusions>
        <exclusion>
            <artifactId>kryo</artifactId>
            <groupId>com.esotericsoftware.kryo</groupId>
        </exclusion>
    </exclusions>
</dependency>

and try whether this solves your problem. If your problem should still
persist, could you share your pom file with us.

Cheers,
Till
​

On Wed, Apr 27, 2016 at 8:34 AM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Hi Ken,
>
> Good point actually, thanks for pointing this out. In Flink project I see
> that there is dependency on 2.24 and then I see transitive dependencies
> through twitter.carbonite has a dependency on 2.21. Also, twitter.chill
> that is used to manipulate Kryo as far as I understand, shows up with
> versions 0.7.4 (Flink) and 0.3.5 (carbonite again). I wonder if this could
> cause issues since if classes are not deduplicated class loading order
> could be different on different machines. The maven project setup is fairly
> complicated and I'm not a maven expert, so I would appreciate a second look
> on that.
>
> Thanks,
> Timur
>
>
> On Tue, Apr 26, 2016 at 6:51 PM, Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> I don’t know if this is helpful, but I’d run into a similar issue (array
>> index out of bounds during Kryo deserialization) due to having a different
>> version of Kryo on the classpath.
>>
>> — Ken
>>
>> On Apr 26, 2016, at 6:23pm, Timur Fayruzov <timur.fairu...@gmail.com>
>> wrote:
>>
>> I built master with scala 2.11 and hadoop 2.7.1, now get a different
>> exception (still serialization-related though):
>>
>> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>> (CoGroup at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162))
>> -> Filter (Filter at
>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))'
>> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
>> Reading Thread' terminated due to an exception: Index: 97, Size: 11
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception: Index:
>> 97, Size: 11
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>> at
>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Index: 97, Size: 11
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
>> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>> at java.util.ArrayList.get(ArrayList.java:429)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>> at
>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
>> at
>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>> at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>> at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>> at
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>> at
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>> at
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>> at
>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>> at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>>
>> On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Then let's keep finger crossed that we've found the culprit :-)
>>>
>>> On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <
>>> timur.fairu...@gmail.com> wrote:
>>>
>>>> Thank you Till.
>>>>
>>>> I will try to run with new binaries today. As I have mentioned, the
>>>> error is reproducible only on a full dataset, so coming up with sample
>>>> input data may be problematic (not to mention that the real data can't be
>>>> shared). I'll see if I can replicate it, but could take a bit longer. Thank
>>>> you very much for your effort.
>>>>
>>>> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Timur,
>>>>>
>>>>> I’ve got good and not so good news. Let’s start with the not so good
>>>>> news. I couldn’t reproduce your problem but the good news is that I found 
>>>>> a
>>>>> bug in the duplication logic of the OptionSerializer. I’ve already
>>>>> committed a patch to the master to fix it.
>>>>>
>>>>> Thus, I wanted to ask you, whether you could try out the latest master
>>>>> and check whether your problem still persists. If that’s the case, could
>>>>> you send me your complete code with sample input data which reproduces 
>>>>> your
>>>>> problem?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>> ​
>>>>>
>>>>> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <
>>>>> aljos...@apache.org> wrote:
>>>>>
>>>>>> Could this be caused by the disabled reference tracking in our Kryo
>>>>>> serializer? From the stack trace it looks like its failing when trying to
>>>>>> deserialize the traits that are wrapped in Options.
>>>>>>
>>>>>> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <u...@apache.org> wrote:
>>>>>>
>>>>>>> Hey Timur,
>>>>>>>
>>>>>>> I'm sorry about this bad experience.
>>>>>>>
>>>>>>> From what I can tell, there is nothing unusual with your code. It's
>>>>>>> probably an issue with Flink.
>>>>>>>
>>>>>>> I think we have to wait a little longer to hear what others in the
>>>>>>> community say about this.
>>>>>>>
>>>>>>> @Aljoscha, Till, Robert: any ideas what might cause this?
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
>>>>>>> <timur.fairu...@gmail.com> wrote:
>>>>>>> > Still trying to resolve this serialization issue. I was able to
>>>>>>> hack it by
>>>>>>> > 'serializing' `Record` to String and then 'deserializing' it in
>>>>>>> coGroup, but
>>>>>>> > boy its so ugly.
>>>>>>> >
>>>>>>> > So the bug is that it can't deserialize the case class that has the
>>>>>>> > structure (slightly different and more detailed than I stated
>>>>>>> above):
>>>>>>> > ```
>>>>>>> > case class Record(name: Name, phone: Option[Phone], address:
>>>>>>> > Option[Address])
>>>>>>> >
>>>>>>> > case class Name(givenName: Option[String], middleName:
>>>>>>> Option[String],
>>>>>>> > familyName: Option[String], generationSuffix: Option[String] =
>>>>>>> None)
>>>>>>> >
>>>>>>> > trait Address{
>>>>>>> >   val city: String
>>>>>>> >   val state: String
>>>>>>> >   val country: String
>>>>>>> >   val latitude: Double
>>>>>>> >   val longitude: Double
>>>>>>> >   val postalCode: String
>>>>>>> >   val zip4: String
>>>>>>> >   val digest: String
>>>>>>> > }
>>>>>>> >
>>>>>>> >
>>>>>>> > case class PoBox(city: String,
>>>>>>> >                  state: String,
>>>>>>> >                  country: String,
>>>>>>> >                  latitude: Double,
>>>>>>> >                  longitude: Double,
>>>>>>> >                  postalCode: String,
>>>>>>> >                  zip4: String,
>>>>>>> >                  digest: String,
>>>>>>> >                  poBox: String
>>>>>>> >                 ) extends Address
>>>>>>> >
>>>>>>> > case class PostalAddress(city: String,
>>>>>>> >                          state: String,
>>>>>>> >                          country: String,
>>>>>>> >                          latitude: Double,
>>>>>>> >                          longitude: Double,
>>>>>>> >                          postalCode: String,
>>>>>>> >                          zip4: String,
>>>>>>> >                          digest: String,
>>>>>>> >                          preDir: String,
>>>>>>> >                          streetName: String,
>>>>>>> >                          streetType: String,
>>>>>>> >                          postDir: String,
>>>>>>> >                          house: String,
>>>>>>> >                          aptType: String,
>>>>>>> >                          aptNumber: String
>>>>>>> >                         ) extends Address
>>>>>>> > ```
>>>>>>> >
>>>>>>> > I would expect that serialization is one of Flink cornerstones and
>>>>>>> should be
>>>>>>> > well tested, so there is a high chance of me doing things wrongly,
>>>>>>> but I
>>>>>>> > can't really find anything unusual in my code.
>>>>>>> >
>>>>>>> > Any suggestion what to try is highly welcomed.
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Timur
>>>>>>> >
>>>>>>> >
>>>>>>> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <
>>>>>>> timur.fairu...@gmail.com>
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hello Robert,
>>>>>>> >>
>>>>>>> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was
>>>>>>> an issue
>>>>>>> >> with a cluster (that I didn't dig into), when I restarted the
>>>>>>> cluster I was
>>>>>>> >> able to go past it, so now I have the following exception:
>>>>>>> >>
>>>>>>> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup
>>>>>>> (CoGroup
>>>>>>> >> at
>>>>>>> >>
>>>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
>>>>>>> >> -> Filter (Filter at
>>>>>>> >>
>>>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
>>>>>>> >> , caused an error: Error obtaining the sorted input: Thread
>>>>>>> 'SortMerger
>>>>>>> >> Reading Thread' terminated due to an exception: Serializer
>>>>>>> consumed more
>>>>>>> >> bytes than the record had. This indicates broken serialization.
>>>>>>> If you are
>>>>>>> >> using custom serialization types (Value or Writable), check their
>>>>>>> >> serialization methods. If you are using a Kryo-serialized type,
>>>>>>> check the
>>>>>>> >> corresponding Kryo serializer.
>>>>>>> >> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>>>> >> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>> >> at java.lang.Thread.run(Thread.java:745)
>>>>>>> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>> input:
>>>>>>> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>> >> Serializer consumed more bytes than the record had. This
>>>>>>> indicates broken
>>>>>>> >> serialization. If you are using custom serialization types (Value
>>>>>>> or
>>>>>>> >> Writable), check their serialization methods. If you are using a
>>>>>>> >> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>>>> >> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>>>> >> ... 3 more
>>>>>>> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>> >> terminated due to an exception: Serializer consumed more bytes
>>>>>>> than the
>>>>>>> >> record had. This indicates broken serialization. If you are using
>>>>>>> custom
>>>>>>> >> serialization types (Value or Writable), check their
>>>>>>> serialization methods.
>>>>>>> >> If you are using a Kryo-serialized type, check the corresponding
>>>>>>> Kryo
>>>>>>> >> serializer.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>> >> Caused by: java.io.IOException: Serializer consumed more bytes
>>>>>>> than the
>>>>>>> >> record had. This indicates broken serialization. If you are using
>>>>>>> custom
>>>>>>> >> serialization types (Value or Writable), check their
>>>>>>> serialization methods.
>>>>>>> >> If you are using a Kryo-serialized type, check the corresponding
>>>>>>> Kryo
>>>>>>> >> serializer.
>>>>>>> >> at
>>>>>>> >> org.apache.flink.runtime.io
>>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>>>>>> >> at
>>>>>>> >> org.apache.flink.runtime.io
>>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>> >> at
>>>>>>> >> org.apache.flink.runtime.io
>>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>>>>>>> >> at
>>>>>>> >> org.apache.flink.runtime.io
>>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
>>>>>>> >> at
>>>>>>> >> org.apache.flink.runtime.io
>>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
>>>>>>> >> at
>>>>>>> org.apache.flink.types.StringValue.readString(StringValue.java:771)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>>>> >> at
>>>>>>> >> org.apache.flink.runtime.io
>>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>> >> ... 5 more
>>>>>>> >>
>>>>>>> >> Thanks,
>>>>>>> >> Timur
>>>>>>> >>
>>>>>>> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <
>>>>>>> rmetz...@apache.org>
>>>>>>> >> wrote:
>>>>>>> >>>
>>>>>>> >>> For the second exception, can you check the logs of the failing
>>>>>>> >>> taskmanager (10.105.200.137)?
>>>>>>> >>> I guess these logs some details on why the TM timed out.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> Are you on 1.0.x or on 1.1-SNAPSHOT?
>>>>>>> >>> We recently changed something related to the ExecutionConfig
>>>>>>> which has
>>>>>>> >>> lead to Kryo issues in the past.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
>>>>>>> >>> <timur.fairu...@gmail.com> wrote:
>>>>>>> >>>>
>>>>>>> >>>> Trying to use ProtobufSerializer -- program consistently fails
>>>>>>> with the
>>>>>>> >>>> following exception:
>>>>>>> >>>>
>>>>>>> >>>> java.lang.IllegalStateException: Update task on instance
>>>>>>> >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots
>>>>>>> - URL:
>>>>>>> >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed
>>>>>>> due to:
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>>>>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>>>>>>> >>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>>>>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>>>>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>>>>>>> >>>> at
>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>>>>>>> >>>> at
>>>>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>>>>>>> >>>> at
>>>>>>> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>>>>>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>>>>>>> >>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>>>>> >>>> [Actor[akka.tcp://
>>>>>>> flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>>>>>>> >>>> after [10000 ms]
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>>>>>>> >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>>>>>>> >>>> at
>>>>>>> >>>>
>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>>>>> >>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>> >>>>
>>>>>>> >>>> I'm at my wits' end now, any suggestions are highly appreciated.
>>>>>>> >>>>
>>>>>>> >>>> Thanks,
>>>>>>> >>>> Timur
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
>>>>>>> >>>> <timur.fairu...@gmail.com> wrote:
>>>>>>> >>>>>
>>>>>>> >>>>> Hello,
>>>>>>> >>>>>
>>>>>>> >>>>> I'm running a Flink program that is failing with the following
>>>>>>> >>>>> exception:
>>>>>>> >>>>>
>>>>>>> >>>>> 2016-04-23 02:00:38,947 ERROR
>>>>>>> org.apache.flink.client.CliFrontend
>>>>>>> >>>>> - Error while running the command.
>>>>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>>>> The program
>>>>>>> >>>>> execution failed: Job execution failed.
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>>>>>> >>>>> at scala.Option.foreach(Option.scala:257)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>>>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>> >>>>> Caused by:
>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job
>>>>>>> >>>>> execution failed.
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>>>> >>>>> at
>>>>>>> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>>>> >>>>> at
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> >>>>> Caused by: java.lang.Exception: The data preparation for task
>>>>>>> 'CHAIN
>>>>>>> >>>>> CoGroup (CoGroup at
>>>>>>> >>>>>
>>>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
>>>>>>> >>>>> Filter (Filter at
>>>>>>> >>>>>
>>>>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
>>>>>>> >>>>> caused an error: Error obtaining the sorted input: Thread
>>>>>>> 'SortMerger
>>>>>>> >>>>> Reading Thread' terminated due to an exception: No more bytes
>>>>>>> left.
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>> >>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>> >>>>> Caused by: java.lang.RuntimeException: Error obtaining the
>>>>>>> sorted
>>>>>>> >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an
>>>>>>> exception: No
>>>>>>> >>>>> more bytes left.
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>>>>>>> >>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>>>> >>>>> ... 3 more
>>>>>>> >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
>>>>>>> Thread'
>>>>>>> >>>>> terminated due to an exception: No more bytes left.
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>> >>>>> Caused by: java.io.EOFException: No more bytes left.
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>>>>>>> >>>>> at com.esotericsoftware.kryo.io
>>>>>>> .Input.readUtf8_slow(Input.java:542)
>>>>>>> >>>>> at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
>>>>>>> >>>>> at com.esotericsoftware.kryo.io
>>>>>>> .Input.readString(Input.java:465)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>> >>>>> at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>>>> >>>>> at
>>>>>>> >>>>> org.apache.flink.runtime.io
>>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>>>>>>> >>>>> at
>>>>>>> >>>>> org.apache.flink.runtime.io
>>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>> >>>>> at
>>>>>>> >>>>> org.apache.flink.runtime.io
>>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>> >>>>> at
>>>>>>> >>>>>
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>> >>>>>
>>>>>>> >>>>> The simplified version of the code looks more or less like
>>>>>>> following:
>>>>>>> >>>>> ```
>>>>>>> >>>>> case class Name(first: String, last: String)
>>>>>>> >>>>> case class Phone(number: String)
>>>>>>> >>>>> case class Address(addr: String, city: String, country: String)
>>>>>>> >>>>> case class Record(n: Name, phone: Option[Phone], addr:
>>>>>>> Option[Address])
>>>>>>> >>>>> ...
>>>>>>> >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name,
>>>>>>> String)] =>
>>>>>>> >>>>> String = ...
>>>>>>> >>>>> ...
>>>>>>> >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
>>>>>>> >>>>>
>>>>>>> >>>>> val helper: DataSet[(Name, String)] = ...
>>>>>>> >>>>>
>>>>>>> >>>>> val result = data.filter(_.address.isDefined)
>>>>>>> >>>>>   .coGroup(helper)
>>>>>>> >>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name,
>>>>>>> >>>>> e.address.get.country)))
>>>>>>> >>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
>>>>>>> >>>>>   .apply {resolutionFunc}
>>>>>>> >>>>>   .filter(_ != "")
>>>>>>> >>>>>
>>>>>>> >>>>> result.writeAsText(...)
>>>>>>> >>>>> ```
>>>>>>> >>>>>
>>>>>>> >>>>> This code fails only when I run it on the full dataset, when I
>>>>>>> split
>>>>>>> >>>>> the `data` on smaller chunks (`helper` always stays the same),
>>>>>>> I'm able to
>>>>>>> >>>>> complete successfully. I guess with smaller memory requirements
>>>>>>> >>>>> serialization/deserialization does not kick in.
>>>>>>> >>>>>
>>>>>>> >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
>>>>>>> >>>>> ```
>>>>>>> >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
>>>>>>> >>>>> classOf[ProtobufSerializer])
>>>>>>> >>>>>
>>>>>>> >>>>> ```
>>>>>>> >>>>> but every run takes significant time before failing, so any
>>>>>>> other
>>>>>>> >>>>> advice is appreciated.
>>>>>>> >>>>>
>>>>>>> >>>>> Thanks,
>>>>>>> >>>>> Timur
>>>>>>> >>>>
>>>>>>> >>>>
>>>>>>> >>>
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>>
>>
>>
>>
>

Reply via email to